package com.xiaohu.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/*
    数据生成器 导入依赖：flink-connector-datagen
     flink有自己的一套数据类型系统，并且为每一个类型提供了序列化器和反序列化器，继承了TypeInformation抽象类

     如果是自己自定义的类型的话，pojo有4个要求：
        1、类一定是公共的
        2、一定要有一个无参的构造方法
        3、所有属性是可访问的，公共或者提供get和set
        4、所有属性的类型是可以序列化的

     flink提供了一个封装好的类型类：Types
 */
public class DataGeneratorDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //设置全局并行度1
        // 多个并行度均分范围去生成，例如最大值100，并行度3，三个并行度：0开始  33开始  66开始


        DataGeneratorSource<String> generatorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number: " + value;
                    }
                },                                                                  // 数据生成器的参数 输入类型固定Long,不能改
                10,                                                                // 数据生成范围，也可以理解次数0-9
                // 无界流的效果Long.MAX_VALUE
                RateLimiterStrategy.perSecond(1),                   //限速每秒钟生成1条数据
                Types.STRING                                                        // 设置返回类型
        );

        DataStreamSource<String> source = env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        source.print();


        env.execute();
    }
}
